package com.simafei.flow.core;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ConcurrentHashSet;
import com.simafei.flow.core.common.ExecStatus;
import com.simafei.flow.core.common.FlowStatus;
import com.simafei.flow.core.common.NodeType;
import com.simafei.flow.core.exception.StartNodeNotFoundException;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/**
 * @author fengpengju
 */
@Getter
@Slf4j
@ToString(exclude = {"edges", "nodes"})
public class Flow {

    /**
     * 决策流ID
     */
    @Setter
    private String id;

    /**
     * flow节点列表
     */
    @Setter
    private List<Node> nodes;

    /**
     * flow连线列表
     */
    private List<Edge> edges;


    public void addEdge(Edge edge) {
        if (edges == null) {
            edges = new ArrayList<>();
        }
        edges.add(edge);

        Node from = edge.getFrom();
        Objects.requireNonNull(from);
        from.addOutEdge(edge);

        Node to = edge.getTo();
        Objects.requireNonNull(to);
        to.addInEdge(edge);
    }

    public void addNode(Node node) {
        if (nodes == null) {
            nodes = new ArrayList<>();
        }
        nodes.add(node);
    }

    public void setEdges(List<Edge> edges) {
        for (Edge edge : edges) {
            addEdge(edge);
        }
    }

    public FlowResult execute(FlowExecution execution, Map<String, Object> params) {
        return doExecute(execution, params, (future, context) -> {
            future.join();
            return finishExecuted(context);
        });
    }

    public void executeAsync(FlowExecution execution, Map<String, Object> params) {
        doExecute(execution, params, (future, context) -> {
            future.thenRun(() -> finishExecuted(context));
            return null;
        });
    }

    private FlowResult doExecute(FlowExecution execution, Map<String, Object> params, BiFunction<CompletableFuture<Void>, ExecutionContext, FlowResult> function) {
        ExecutionContext context = buildContext(execution, params);
        Optional.of(context.getExecutionListener())
                .ifPresent(listener -> listener.beforeFlowExecute(context, Flow.this));
        Optional<Node> startNode = nodes.stream()
                .filter(node -> node.getNodeType() == NodeType.START)
                .findFirst();

        if (startNode.isPresent()) {
            CompletableFuture<Void> future = startNode.get()
                    .execute(context, List.of(params))
                    .exceptionally(throwable -> {
                        Optional.of(context.getExecutionListener())
                                .ifPresent(listener -> listener.onFlowError(context, Flow.this, throwable));
                        return null;
                    });

            return function.apply(future, context);
        } else {
            throw new StartNodeNotFoundException();
        }
    }

    public FlowResult resume(FlowResumption resumption) {
        return doResume(resumption, (future, context) -> {
                    future.join();
                    return finishResume(context);
                }
        );
    }

    public void resumeAsync(FlowResumption resumeContext) {
        doResume(resumeContext, (future, context) -> {
            future.thenRun(() -> finishResume(context));
            return null;
        });
    }

    private FlowResult finishResume(ExecutionContext context) {
        return finishExecuted(context);
    }

    private FlowResult doResume(FlowResumption resumption, BiFunction<CompletableFuture<Void>, ExecutionContext, FlowResult> function) {
        Node node = resumption.getNode();
        Optional<NodeResult> current = resumption.getExecutedNodeResults().stream()
                .filter(nodeResult -> Objects.equals(nodeResult.getNodeId(), node.getId())
                                && nodeResult.getExecStatus() == ExecStatus.WAITING)
                .findFirst();
        List<Map<String, Object>> nodeInputParams;
        if (current.isEmpty()) {
            // 如果没有执行记录，那就是直接从节点恢复，直接把参数作为节点入参
            nodeInputParams = List.of(resumption.getResumeParams());
        } else {
            nodeInputParams = current.get().getInputParams();
        }

        ExecutionContext executionContext = convertContext(resumption);

        CompletableFuture<Void> future = node.execute(executionContext, nodeInputParams, resumption.getParentId());
        future.exceptionally(throwable -> {
            Optional.of(executionContext.getExecutionListener())
                    .ifPresent(listener -> listener.onFlowError(executionContext, Flow.this, throwable));
            return null;
        });
        return function.apply(future, executionContext);
    }

    private FlowResult finishExecuted(ExecutionContext context) {
        Map<String, Object> params = context.getInputParams();
        FlowResult flowResult;
        if (context.getExecStatus() == ExecStatus.FAILED) {
            List<Throwable> causes;
            if (CollectionUtil.isEmpty(context.getNodeResults())) {
                causes = List.of();
            } else {
                causes = context.getNodeResults().stream()
                        .map(NodeResult::getCause)
                        .filter(Objects::nonNull)
                        .collect(Collectors.toList());
            }
            flowResult = FlowResult.builder()
                    .execParam(params)
                    .execStatus(ExecStatus.FAILED)
                    .flowStatus(FlowStatus.REJECT)
                    .spendTime(System.currentTimeMillis() - context.getStartTime())
                    .causes(causes)
                    .build();
        } else {
            List<Map<String, Object>> endResults;
            if (CollectionUtil.isEmpty(context.getNodeResults())) {
                endResults = List.of();
            } else {
                endResults = context.getNodeResults().stream()
                        .filter(nodeResult -> nodeResult.getNodeType() == NodeType.END)
                        .flatMap(nodeResult -> nodeResult.getResults().stream())
                        .flatMap(result -> result.getResults().stream())
                        .collect(Collectors.toList());
            }
            ExecStatus execStatus = calFinalExecStatus(context.getExecStatus());
            flowResult = FlowResult.builder()
                    .execParam(params)
                    .execStatus(execStatus)
                    .flowStatus(calFinalFlowStatus(execStatus, context.getFlowStatus()))
                    .spendTime(System.currentTimeMillis() - context.getStartTime())
                    .execResults(endResults)
                    .build();
        }
        Optional.ofNullable(context.getExecutionListener())
                .ifPresent(listener -> listener.afterFlowExecute(context, this, flowResult));

        return flowResult;
    }

    private ExecStatus calFinalExecStatus(ExecStatus execStatus) {
        if (execStatus == ExecStatus.RUNNING) {
            // 最终状态还是RUNNING，代表流程没执行到结束节点(流程执行到结束节点会设置状态为FINISHED)，流程中断
            return ExecStatus.INTERRUPT;
        } else {
            // 流程执行完成，默认成功，只有指定了拒绝才拒绝
            return execStatus;
        }
    }

    private FlowStatus calFinalFlowStatus(ExecStatus execStatus, FlowStatus flowStatus) {
        if (execStatus == ExecStatus.WAITING) {
            return FlowStatus.UNDETERMINED;
        }
        if (execStatus != ExecStatus.FINISHED) {
            // 流程都没执行完，直接拒绝
            return FlowStatus.REJECT;
        } else {
            // 流程执行完成，默认成功，只有指定了拒绝才拒绝
            return flowStatus == FlowStatus.REJECT ? FlowStatus.REJECT : FlowStatus.PASS;
        }
    }

    private ExecutionContext buildContext(FlowExecution execution, Map<String, Object> params) {
        return ExecutionContext.builder()
                .flowId(this.id)
                .execId(execution.getExecId())
                .test(execution.isTest())
                .executionListener(execution.getExecutionListener())
                .execStatus(ExecStatus.RUNNING)
                .flowStatus(FlowStatus.UNDETERMINED)
                .inputParams(params)
                .startTime(System.currentTimeMillis())
                .build();
    }

    private ExecutionContext convertContext(FlowResumption resumption) {
        Set<NodeResult> nodeResults = new ConcurrentHashSet<>(resumption.getExecutedNodeResults());
        ExecutionContext context = ExecutionContext.builder()
                .flowId(resumption.getFlowId())
                .execId(resumption.getExecId())
                .test(resumption.isTest())
                .nodeResults(nodeResults)
                .executionListener(resumption.getExecutionListener())
                .execStatus(ExecStatus.WAITING)
                .inputParams(resumption.getFlowInputParams())
                .flowStatus(resumption.getFlowStatus())
                .build();
        // 把此次人工节点参数放到属性中，后续执行时，用人工输入的参数作为节点执行结果
        context.setAttributes(resumption.getNode().getId(), resumption.getResumeParams());
        return context;
    }
}
